#!/usr/bin/env python
# -*- coding:utf-8 -*-

"""
=====================
Kafka 装饰器
====================
"""

#Author : wu.zheng<wu.zheng@socialcredits.cn>


def kafka_producer(hosts, topic):
    """
    Kafka producer

    params:
    -------
    hosts:
    topic:
    """

    from kafka import SimpleProducer, KafkaClient

    def decorator(function):
        def _decorator(*args, **kwargs):
            kafka = KafkaClient(hosts)
            producer = SimpleProducer(
                kafka,
                async=True,
                batch_send_every_n=100,
                batch_send_every_t=60)

            for item in function(*args, **kwargs):
                message = repr(item)
                producer.send_messages(topic, message)
            kafka.close()
        return _decorator
    return decorator


def kafka_consumer(hosts, topic, group_id, count=0, is_iteration=False):
    """
    Kafka consumer
    params:
    -------

    hosts:
    topic:
    group_id:
    count:  需要添加数量
    is_iteration: 这个参数是为了和MQ 的装饰器结合使用的,会产生一个迭代器

    return:
    -------
    返回一个message 对象  有offset, value, partition 等属性
    """

    from kafka import KafkaConsumer

    if isinstance(hosts, basestring):
        hosts = [hosts]

    def decorator(function,):
        def _decorator(message,  *args, **kwargs):
            consumer = KafkaConsumer(topic,
                                     bootstrap_servers=hosts,
                                     group_id=group_id,
                                     auto_commit_enable=False,
                                     auto_commit_interval_ms=1*1000,
                                     fetch_message_max_bytes=1024*10240,
                                     fetch_min_bytes=1024*1024,
                                     fetch_wait_max_ms=10 ,
                                     auto_offset_reset='smallest'
                                     )
            total_message = 0
            index = 0
            for message in consumer:
                index += 1
                if not total_message:
                    total_message = consumer.get_partition_offsets(topic, message.partition, -1, 100)
                    total_message = total_message[0]
                result = function(message , *args, **kwargs)
                if is_iteration:
                    yield result

                consumer.task_done(message)
                if message.offset == total_message-1:
                    break
                if count and  index == count:
                    break
            consumer.commit()
            consumer.close()
        return _decorator
    return decorator


